Conversation
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The reconnect logic in
run()currently sleeps for a fixed 2 seconds after anyaiohttp.ClientConnectionError; consider using a simple backoff strategy (or making the delay configurable) so that repeated failures don't result in a tight reconnect loop or unnecessarily long pauses. - Typing session state in
_typing_statesis never pruned for users that no longer have owners/tasks; you may want to drop emptyTypingSessionStateentries once bothownersand associated tasks are cleared to avoid unbounded growth over long-running processes.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The reconnect logic in `run()` currently sleeps for a fixed 2 seconds after any `aiohttp.ClientConnectionError`; consider using a simple backoff strategy (or making the delay configurable) so that repeated failures don't result in a tight reconnect loop or unnecessarily long pauses.
- Typing session state in `_typing_states` is never pruned for users that no longer have owners/tasks; you may want to drop empty `TypingSessionState` entries once both `owners` and associated tasks are cleared to avoid unbounded growth over long-running processes.
## Individual Comments
### Comment 1
<location path="astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py" line_range="53" />
<code_context>
error: str | None = None
+@dataclass
+class TypingSessionState:
+ ticket: str | None = None
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the new typing-related logic into a dedicated manager and clarifying TypingSessionState transitions to keep the adapter simpler and the state machine easier to follow.
You can reduce the new complexity substantially by explicitly encapsulating the typing orchestration, while keeping behavior identical.
### 1. Extract a dedicated typing manager
Right now `WeixinOCAdapter` owns:
- `_typing_states`
- `TypingSessionState`
- all the orchestration methods (`_get_typing_state`, `_ensure_typing_ticket`, `_run_typing_keepalive`, `start_typing`, `stop_typing`, `_cleanup_typing_tasks`, …)
You can move these into a small helper class that the adapter delegates to. This keeps the adapter focused on login/polling/message handling and isolates the state machine.
Example sketch:
```python
@dataclass
class TypingSessionState:
ticket: str | None = None
ticket_context_token: str | None = None
refresh_after: float = 0.0
keepalive_task: asyncio.Task | None = None
cancel_task: asyncio.Task | None = None
owners: set[str] = field(default_factory=set)
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
class WeixinTypingManager:
def __init__(
self,
client: WeixinOCClient,
*,
adapter_id: str,
shutdown_event: asyncio.Event,
keepalive_interval_s: int,
ticket_ttl_s: int,
context_tokens: dict[str, str],
) -> None:
self._client = client
self._adapter_id = adapter_id
self._shutdown_event = shutdown_event
self._keepalive_interval_s = keepalive_interval_s
self._ticket_ttl_s = ticket_ttl_s
self._context_tokens = context_tokens
self._states: dict[str, TypingSessionState] = {}
# move _get_typing_state, _typing_supported_for, _ensure_typing_ticket,
# _send_typing_state, _run_typing_keepalive, _typing_keepalive_loop,
# _delayed_cancel_typing, start_typing, stop_typing, _cleanup_typing_tasks
# into methods here, switching self.client → self._client,
# self.meta().id → self._adapter_id, self._typing_states → self._states, etc.
```
Adapter usage then becomes a small surface:
```python
class WeixinOCAdapter(Platform):
def __init__(...):
...
self._typing_manager = WeixinTypingManager(
client=self.client,
adapter_id=self.meta().id,
shutdown_event=self._shutdown_event,
keepalive_interval_s=self._typing_keepalive_interval_s,
ticket_ttl_s=self._typing_ticket_ttl_s,
context_tokens=self._context_tokens,
)
async def start_typing(self, user_id: str, owner_id: str) -> None:
await self._typing_manager.start_typing(user_id, owner_id)
async def stop_typing(self, user_id: str, owner_id: str) -> None:
await self._typing_manager.stop_typing(user_id, owner_id)
async def run(self) -> None:
...
finally:
await self._typing_manager.cleanup()
await self.client.close()
async def terminate(self) -> None:
self._shutdown_event.set()
await self._typing_manager.cleanup()
```
This is mostly mechanical (rename `self` references, pass dependencies via `__init__`) and does not change behavior, but it cuts the adapter’s responsibility surface significantly.
### 2. Localize state transitions on `TypingSessionState`
A lot of complexity comes from repeated patterns manipulating `owners`, `keepalive_task`, and `cancel_task` under locks. You can move these into small methods on `TypingSessionState` (or on the manager operating on a single state) to make the state machine more explicit and reduce repeated branching.
For example:
```python
@dataclass
class TypingSessionState:
...
# Invariants:
# - if owners is empty, keepalive_task is None
# - cancel_task is only non-None while a cancel is scheduled but not sent/completed
def has_active_owners(self) -> bool:
return bool(self.owners)
def add_owner(self, owner_id: str) -> bool:
"""Returns True if this transitioned from 0 → 1 owners."""
before = len(self.owners)
self.owners.add(owner_id)
return before == 0 and len(self.owners) == 1
def remove_owner(self, owner_id: str) -> bool:
"""Returns True if this transitioned from 1 → 0 owners."""
if owner_id not in self.owners:
return False
self.owners.remove(owner_id)
return not self.owners
```
Then `start_typing` / `stop_typing` logic becomes easier to follow:
```python
async def start_typing(self, user_id: str, owner_id: str) -> None:
state = self._get_typing_state(user_id)
cancel_task: asyncio.Task | None = None
async with state.lock:
if not self._typing_supported_for(user_id):
return
first_owner = state.add_owner(owner_id)
if not first_owner:
# already active; nothing else to do
return
# remaining logic unchanged, but now you know this is the 0→1 transition
```
```python
async def stop_typing(self, user_id: str, owner_id: str) -> None:
state = self._states.get(user_id)
if state is None:
return
task: asyncio.Task | None = None
async with state.lock:
last_owner = state.remove_owner(owner_id)
if not last_owner:
return
task = state.keepalive_task
state.keepalive_task = None
...
```
This doesn’t change semantics but encodes the important transitions (`0→1`, `1→0`) explicitly, so you don’t need to mentally reconstruct them from raw set length checks in multiple places.
### 3. Use `_cancel_task_safely` consistently
You already created `_cancel_task_safely`, but some places still cancel tasks directly and await later. Centralizing the pattern makes the async lifecycle easier to reason about and eliminates subtle differences.
Example in `_cleanup_typing_tasks`:
```python
async def _cleanup_typing_tasks(self) -> None:
tasks: list[asyncio.Task] = []
cancels: list[tuple[str, str]] = []
for user_id, state in list(self._states.items()):
...
if state.keepalive_task is not None:
tasks.append(state.keepalive_task)
state.keepalive_task = None
if state.cancel_task is not None:
tasks.append(state.cancel_task)
state.cancel_task = None
for task in tasks:
await self._cancel_task_safely(
task,
log_message="weixin_oc(%s): typing cleanup failed",
log_args=(self._adapter_id,),
)
```
If you move the logic into the manager, it’s easy to ensure that any place that cancels tasks uses this helper, instead of mixing direct `task.cancel()` calls with separate awaiting logic.
---
These steps keep all current behavior (including error handling and retry semantics) but:
- decouple typing from the adapter,
- make the per-user state machine explicit,
- and concentrate task/lock orchestration in a single, smaller component.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| error: str | None = None | ||
|
|
||
|
|
||
| @dataclass |
There was a problem hiding this comment.
issue (complexity): Consider extracting the new typing-related logic into a dedicated manager and clarifying TypingSessionState transitions to keep the adapter simpler and the state machine easier to follow.
You can reduce the new complexity substantially by explicitly encapsulating the typing orchestration, while keeping behavior identical.
1. Extract a dedicated typing manager
Right now WeixinOCAdapter owns:
_typing_statesTypingSessionState- all the orchestration methods (
_get_typing_state,_ensure_typing_ticket,_run_typing_keepalive,start_typing,stop_typing,_cleanup_typing_tasks, …)
You can move these into a small helper class that the adapter delegates to. This keeps the adapter focused on login/polling/message handling and isolates the state machine.
Example sketch:
@dataclass
class TypingSessionState:
ticket: str | None = None
ticket_context_token: str | None = None
refresh_after: float = 0.0
keepalive_task: asyncio.Task | None = None
cancel_task: asyncio.Task | None = None
owners: set[str] = field(default_factory=set)
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
class WeixinTypingManager:
def __init__(
self,
client: WeixinOCClient,
*,
adapter_id: str,
shutdown_event: asyncio.Event,
keepalive_interval_s: int,
ticket_ttl_s: int,
context_tokens: dict[str, str],
) -> None:
self._client = client
self._adapter_id = adapter_id
self._shutdown_event = shutdown_event
self._keepalive_interval_s = keepalive_interval_s
self._ticket_ttl_s = ticket_ttl_s
self._context_tokens = context_tokens
self._states: dict[str, TypingSessionState] = {}
# move _get_typing_state, _typing_supported_for, _ensure_typing_ticket,
# _send_typing_state, _run_typing_keepalive, _typing_keepalive_loop,
# _delayed_cancel_typing, start_typing, stop_typing, _cleanup_typing_tasks
# into methods here, switching self.client → self._client,
# self.meta().id → self._adapter_id, self._typing_states → self._states, etc.Adapter usage then becomes a small surface:
class WeixinOCAdapter(Platform):
def __init__(...):
...
self._typing_manager = WeixinTypingManager(
client=self.client,
adapter_id=self.meta().id,
shutdown_event=self._shutdown_event,
keepalive_interval_s=self._typing_keepalive_interval_s,
ticket_ttl_s=self._typing_ticket_ttl_s,
context_tokens=self._context_tokens,
)
async def start_typing(self, user_id: str, owner_id: str) -> None:
await self._typing_manager.start_typing(user_id, owner_id)
async def stop_typing(self, user_id: str, owner_id: str) -> None:
await self._typing_manager.stop_typing(user_id, owner_id)
async def run(self) -> None:
...
finally:
await self._typing_manager.cleanup()
await self.client.close()
async def terminate(self) -> None:
self._shutdown_event.set()
await self._typing_manager.cleanup()This is mostly mechanical (rename self references, pass dependencies via __init__) and does not change behavior, but it cuts the adapter’s responsibility surface significantly.
2. Localize state transitions on TypingSessionState
A lot of complexity comes from repeated patterns manipulating owners, keepalive_task, and cancel_task under locks. You can move these into small methods on TypingSessionState (or on the manager operating on a single state) to make the state machine more explicit and reduce repeated branching.
For example:
@dataclass
class TypingSessionState:
...
# Invariants:
# - if owners is empty, keepalive_task is None
# - cancel_task is only non-None while a cancel is scheduled but not sent/completed
def has_active_owners(self) -> bool:
return bool(self.owners)
def add_owner(self, owner_id: str) -> bool:
"""Returns True if this transitioned from 0 → 1 owners."""
before = len(self.owners)
self.owners.add(owner_id)
return before == 0 and len(self.owners) == 1
def remove_owner(self, owner_id: str) -> bool:
"""Returns True if this transitioned from 1 → 0 owners."""
if owner_id not in self.owners:
return False
self.owners.remove(owner_id)
return not self.ownersThen start_typing / stop_typing logic becomes easier to follow:
async def start_typing(self, user_id: str, owner_id: str) -> None:
state = self._get_typing_state(user_id)
cancel_task: asyncio.Task | None = None
async with state.lock:
if not self._typing_supported_for(user_id):
return
first_owner = state.add_owner(owner_id)
if not first_owner:
# already active; nothing else to do
return
# remaining logic unchanged, but now you know this is the 0→1 transitionasync def stop_typing(self, user_id: str, owner_id: str) -> None:
state = self._states.get(user_id)
if state is None:
return
task: asyncio.Task | None = None
async with state.lock:
last_owner = state.remove_owner(owner_id)
if not last_owner:
return
task = state.keepalive_task
state.keepalive_task = None
...This doesn’t change semantics but encodes the important transitions (0→1, 1→0) explicitly, so you don’t need to mentally reconstruct them from raw set length checks in multiple places.
3. Use _cancel_task_safely consistently
You already created _cancel_task_safely, but some places still cancel tasks directly and await later. Centralizing the pattern makes the async lifecycle easier to reason about and eliminates subtle differences.
Example in _cleanup_typing_tasks:
async def _cleanup_typing_tasks(self) -> None:
tasks: list[asyncio.Task] = []
cancels: list[tuple[str, str]] = []
for user_id, state in list(self._states.items()):
...
if state.keepalive_task is not None:
tasks.append(state.keepalive_task)
state.keepalive_task = None
if state.cancel_task is not None:
tasks.append(state.cancel_task)
state.cancel_task = None
for task in tasks:
await self._cancel_task_safely(
task,
log_message="weixin_oc(%s): typing cleanup failed",
log_args=(self._adapter_id,),
)If you move the logic into the manager, it’s easy to ensure that any place that cancels tasks uses this helper, instead of mixing direct task.cancel() calls with separate awaiting logic.
These steps keep all current behavior (including error handling and retry semantics) but:
- decouple typing from the adapter,
- make the per-user state machine explicit,
- and concentrate task/lock orchestration in a single, smaller component.
There was a problem hiding this comment.
Code Review
This pull request implements a typing indicator mechanism across the platform, with a specific implementation for the weixin_oc adapter. Key changes include the addition of a stop_typing method to the base message event, state management for typing sessions (including keep-alive and delayed cancellation) in the WeChat adapter, and integration into the internal processing pipeline to ensure typing status is correctly managed during LLM requests. Additionally, the PR includes error handling for connection issues in the WeChat adapter and comprehensive unit tests for the new typing logic. I have no feedback to provide.
修复 #7022 中
weixin_oc在网络或代理短暂切换后断连并停止运行的问题。此前WeixinOCAdapter.run()只会在入站长轮询出现asyncio.TimeoutError时继续轮询(#6915 已覆盖该场景),但像aiohttp.ServerDisconnectedError这类可恢复的连接异常仍会直接跳出循环,导致适配器永久停止。Modifications / 改动点
在
astrbot/core/platform/sources/weixin_oc/weixin_oc_adapter.py中扩展了入站轮询的异常恢复逻辑:当WeixinOCAdapter.run()捕获到aiohttp.ClientConnectionError时,会记录最近一次入站错误、输出 warning 日志、关闭当前客户端会话,并在短暂等待后继续轮询。保留了非网络异常的原有致命错误路径,避免将业务逻辑错误或其他非预期异常误判为可恢复问题而被静默吞掉。
在
tests/unit/test_weixin_oc_typing.py中补充了回归测试,覆盖以下场景:aiohttp.ServerDisconnectedError后能够恢复轮询沿用已合并的 fix: keep weixin_oc polling after inbound timeouts #6915 超时恢复模式。
This is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Add robust typing indicator support and connection recovery for the weixin_oc adapter, and ensure agent pipeline correctly manages typing lifecycle across normal and error flows.
New Features:
Bug Fixes:
Enhancements:
Tests: